package com.bw.yk03;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.Date;

//DwsBean(stt=2020-12-22 14:10:10, edt=2020-12-22 14:10:20, provinceId=4, provinceName=内蒙古, skuId=33, skuName=香奈儿（Chanel）女士香水5号香水 粉邂逅柔情淡香水EDT 粉邂逅淡香水35ml, orderCnt=1, orderAmount=976.0, skuCnt=2)
/*
create table yk9sink (
                                     stt DateTime,
                                     edt DateTime,
                                     province_id  String,
                                     province_name String,
                                     sku_id String,
                                     sku_name String,
                                     order_cnt Int32 ,
                                     order_amount Float64,
                                     sku_cnt Int64 ,
                                     ts Int64
)engine =ReplacingMergeTree(ts)
     partition by  toYYYYMMDD(stt)
     order by   (stt,edt,province_id,sku_id);


     select * from (
select *,row_number() over(partition by province_id,province_name order by ordercnts desc,skucnts desc) rk from (
select province_id,province_name, sku_id,sku_name,sum(order_cnt) ordercnts,sum(sku_cnt) skucnts from yk9sink group by province_id,province_name,sku_id,sku_name))
where rk<=5;

 */
public class FlinkDWSSinkClickhouse extends RichSinkFunction<DwsBean> {
    Connection conn = null;
    PreparedStatement ps = null;


    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
        conn = DriverManager.getConnection("jdbc:clickhouse://hadoop-single:8123/default", "default", "");
        ps = conn.prepareStatement("insert into yk9sink values (?,?,?,?,?,?,?,?,?,?)");
    }

    @Override
    public void close() throws Exception {
        ps.close();
        conn.close();
    }

    @Override
    public void invoke(DwsBean value, Context context) throws Exception {
        ps.setString(1,value.getStt());
        ps.setString(2,value.getEdt());
        ps.setString(3,value.getProvinceId());
        ps.setString(4,value.getProvinceName());
        ps.setString(5,value.getSkuId());
        ps.setString(6,value.getSkuName());
        ps.setInt(7,value.getOrderCnt());
        ps.setDouble(8,value.getOrderAmount());
        ps.setLong(9,value.getSkuCnt());
        ps.setLong(10,new Date().getTime());
        System.out.println(ps.toString());
        ps.execute();
    }
}
